home *** CD-ROM | disk | FTP | other *** search
- rem
- rem $Header: prvtalrt.sql 7020100.1 94/09/23 22:13:16 cli Generic<base> $
- rem
- Rem
- Rem NAME
- Rem prvtalrt.sql - Blocking implementation of DBMS "alerts"
- Rem DESCRIPTION
- Rem These are private functions to be released in PL/SQL binary form.
- Rem Routines to wait-for, and signal, a named event. The waiting
- Rem session will block in the database until the event occurs, or until
- Rem a timeout expires. The implementation avoids polling except when
- Rem running in parallel server mode.
- Rem RETURNS
- Rem
- Rem NOTES
- Rem The procedural option is needed to use this facility.
- Rem The package body is private. The public package specification is
- Rem in dbmsalrt.sql.
- Rem
- Rem MODIFIED (MM/DD/YY)
- Rem adowning 03/29/94 - merge changes from branch 1.1.710.1
- Rem adowning 02/10/94 - Branch_for_patch
- Rem adowning 02/10/94 - Creation
- Rem adowning 02/10/94 - Creation
- Rem mmoore 03/17/93 - merge changes from branch 1.6.312.1
- Rem mmoore 03/11/92 - #(153818) fix looping in signal upon cleanup
- Rem rkooi 12/03/92 - #141803, improve some comments
- Rem rkooi 11/25/92 - allow signalling and waiting in same session
- Rem rkooi 11/17/92 - pipe cleanup bug
- Rem rkooi 11/12/92 - don't call removeall from signal
- Rem rkooi 08/12/92 - surface removeall function
- Rem rkooi 06/05/92 - Creation
- REM
-
- drop table dbms_alert_info
- /
- create table dbms_alert_info
- (
- name varchar2(30),
- sid varchar2(30),
- changed varchar2(1),
- message varchar2(1800),
- primary key (name, sid)
- )
- /
-
- create or replace package body dbms_alert is
- p_int number := 5; -- poll once every 5 seconds iff
- -- it is needed.
- this_session_id varchar2(30) := dbms_session.unique_session_id;
- parallel boolean := dbms_utility.is_parallel_server;
- sigpipe varchar2(30) := 'ORA$ALERT$' || this_session_id;
- msgseq binary_integer := 0;
- firstregister boolean := TRUE;
- instantiating_pkg boolean := TRUE;
-
- function minimum(v1 number, v2 number) return number is
- begin
- if v1 < v2 then
- return v1;
- else
- return v2;
- end if;
- end;
-
- function alert_hash(name varchar2) return integer is
- hashval binary_integer := 0;
- strlen binary_integer := lengthb(name);
- begin
- for i in 1..strlen loop
- hashval := hashval + ascii(substrb(name,i,1));
- end loop;
- return 2000000000 + (hashval mod 1021);
- end;
-
- function session_hash(name varchar2) return integer is
- hashval binary_integer := 0;
- strlen binary_integer := lengthb(name);
- begin
- for i in 1..strlen loop
- hashval := hashval + ascii(substrb(name,i,1));
- end loop;
- return 2000001021 + (hashval mod 1021);
- end;
-
- procedure set_defaults(sensitivity in number) is
- begin
- if sensitivity >= 0 then
- p_int := sensitivity;
- end if;
- end;
-
- procedure register(name in varchar2) is
- status integer;
- lstatus integer;
- lockid integer;
- cursor c1 is
- select distinct substr(kglnaobj,11) sid from x$kglob
- where kglhdnsp = 7
- and kglnaobj like 'ORA$ALERT$%'
- and bitand(kglhdflg,128)!=0
- union
- select distinct sid from dbms_alert_info;
- begin
- if instantiating_pkg then
- removeall;
- instantiating_pkg := FALSE;
- end if;
-
- if (firstregister) then
- -- See if there are any orphaned pipes that should be cleaned up
- for rec in c1 loop
- -- see if the session is alive
- lockid := session_hash(rec.sid);
- lstatus := dbms_lock.request(lockid, dbms_lock.x_mode,
- timeout => 0, release_on_commit => TRUE);
- if lstatus = 0 then
- -- session must be dead so cleanup
- dbms_pipe.purge('ORA$ALERT$' || rec.sid);
- delete dbms_alert_info where sid = rec.sid;
- commit;
- elsif lstatus not in (1,2,4) then -- timeout, deadlock, already own
- raise_application_error(-20000,
- 'ORU-10025: lock request error, status: ' || to_char(lstatus));
- end if;
- end loop;
-
- -- get lock to indicate that this session is alive. Status 4 can occur
- -- if this package gets reinstantiated and we already have this lock.
- -- Use s_mode in case there is a hash collision between two session ids.
- -- Use timeout of 60 in case someone is cleaning up this session id.
- lstatus := dbms_lock.request(session_hash(this_session_id),
- dbms_lock.s_mode, timeout => 60);
- if lstatus != 0 and lstatus != 4 then
- raise_application_error(-20000,
- 'ORU-10021: lock request error, status: ' || to_char(lstatus));
- end if;
- firstregister := FALSE;
- end if;
-
- -- Make sure user A does not register for this alert between the time
- -- user B signals the alert and the time user B commits. Otherwise the
- -- following sequence can occur: B signals (updates dbms_alert_info), A
- -- registers (adds new entry to dbms_alert_info that B will not have
- -- updated), A reads the data covered by the alert, B commits (causing
- -- new data to be written), A does a wait. A will not see B's signal.
- status := dbms_lock.request(alert_hash(upper(name)), dbms_lock.x_mode,
- dbms_lock.maxwait, release_on_commit => TRUE);
- if status != 0 then
- raise_application_error(-20000,
- 'ORU-10002: lock request error, status: ' || to_char(status));
- end if;
-
- insert into dbms_alert_info values (upper(register.name), this_session_id,
- 'N', NULL);
- commit;
-
- exception
- when dup_val_on_index then commit; -- commit to release the lock
- end;
-
- procedure remove(name in varchar2) is
- begin
- if instantiating_pkg then
- removeall;
- instantiating_pkg := FALSE;
- end if;
-
- delete from dbms_alert_info
- where name = upper(remove.name)
- and sid = this_session_id;
- commit;
- end;
-
- procedure pipe_wait(maxtime number, cumtime in out number) is
- status integer;
- tmo number := maxtime;
- begin
- -- the time to wait is:
- -- if running in parallel mode then we must effectively poll since pipes
- -- do not work across instances yet. This will be fixed when
- -- pipes are upgraded to work in parallel mode. So wait for 'p_int'.
- -- if not parallel mode then don't
- -- if parallel mode then do polling loop since pipes do not
- -- yet work parallel mode
- if parallel then
- tmo := minimum(tmo, p_int);
- end if;
- if tmo = maxwait then
- tmo := dbms_pipe.maxwait; -- map to dbms_pipe's idea of maxwait
- end if;
-
- status := dbms_pipe.receive_message(sigpipe, tmo);
- if status = 1 then
- cumtime := cumtime + tmo;
- return;
- end if;
- if status <> 0 then
- raise_application_error(-20000, 'ORU-10015: error:' || to_char(status)
- || ' waiting for pipe message.');
- end if;
- return;
- end;
-
- -- optimistic pass for waitany. keeps from waiting on a pending
- -- transaction if there exists some other, committed, alert.
- procedure optimistic(
- name out varchar2,
- message out varchar2,
- status out integer)
- is
- lockid integer;
- lstatus integer;
- cursor c1 is
- select name from dbms_alert_info
- where sid = this_session_id
- and changed = 'Y';
- begin
- status := 1;
- for rec in c1 loop
- lockid := alert_hash(rec.name);
- lstatus := dbms_lock.request(lockid, dbms_lock.sx_mode, timeout => 0,
- release_on_commit => TRUE);
- if lstatus <> 1 then
- if lstatus <> 0 then
- raise_application_error(-20000, 'ORU-10019: error ' ||
- to_char(lstatus) || ' on lock request.');
- end if;
- update dbms_alert_info set changed = 'N'
- where sid = this_session_id
- and name = rec.name;
- select message into message from dbms_alert_info
- where sid = this_session_id
- and name = rec.name;
- commit;
- dbms_pipe.purge(sigpipe); -- just to avoid unnecessary work next time
- name := rec.name;
- status := 0;
- return;
- end if;
- end loop;
- return;
- end;
-
- procedure waitany(
- name out varchar2,
- message out varchar2,
- status out integer,
- timeout in number default maxwait)
- is
- waitime number := 0;
- cumtime number := 0;
- lockid integer;
- st integer;
- lstatus integer;
- timedout boolean;
- changed varchar2(1);
- foundone boolean;
- cursor c1 is
- select name from dbms_alert_info
- where sid = this_session_id;
- begin
- if instantiating_pkg then
- removeall;
- instantiating_pkg := FALSE;
- end if;
-
- optimistic(name, message, st);
- if st = 0 then
- status := st;
- return;
- end if;
- waitime := 1;
- cumtime := 0;
- loop
- timedout := FALSE;
- foundone := FALSE;
- for rec in c1 loop
- foundone := TRUE;
- lockid := alert_hash(rec.name);
- lstatus := dbms_lock.request(lockid, dbms_lock.sx_mode, waitime,
- release_on_commit => TRUE);
- if lstatus = 1 then -- timed out
- optimistic(name, message, st); -- see if anyone else committed
- -- in the meantime...
- if st = 0 then -- someone *did* commit, so alert happened...
- status := 0;
- return;
- end if;
- cumtime := cumtime + waitime;
- if cumtime >= timeout then
- status := 1; -- exceeded caller-specified timeout
- return;
- end if;
- timedout := TRUE;
- goto continue;
- elsif lstatus <> 0 then
- raise_application_error(-20000,
- 'ORU-10020: error ' || to_char(lstatus) || ' on lock request.');
- else
- -- now that we have the row covered by a lock we can select
- -- the changed and message columns from it.
- select changed, message into changed, message from dbms_alert_info
- where sid = this_session_id
- and name = rec.name;
- if changed = 'Y' then -- alert occurred
- update dbms_alert_info set changed = 'N'
- where sid = this_session_id
- and name = rec.name;
- commit;
-
- name := rec.name;
- status := 0;
- dbms_pipe.purge(sigpipe);
- return;
- end if;
- lstatus := dbms_lock.release(lockid);
- end if;
- <<continue>>
- null; -- there is no 'continue' stmt in pl/sql
- end loop;
- if not foundone then
- raise_application_error(-20000,
- 'ORU-10024: there are no alerts registered.');
- end if;
-
- if timedout then
- waitime := minimum(waitime*2, 32); -- do exponential backoff, max at 32
- waitime := minimum(waitime, timeout-cumtime);
- else
- -- nothing to wait on so wait on pipe
- pipe_wait(timeout-cumtime, cumtime);
- end if;
- if cumtime >= timeout then
- status := 1;
- return;
- end if;
-
- end loop;
- end;
-
- procedure waitone(
- name in varchar2,
- message out varchar2,
- status out integer,
- timeout in number default maxwait)
- is
- cumtime number := 0;
- lockid integer := alert_hash(upper(name));
- lstatus integer;
- begin
- if instantiating_pkg then
- removeall;
- instantiating_pkg := FALSE;
- end if;
-
- loop
- lstatus := dbms_lock.request(lockid, dbms_lock.sx_mode, timeout-cumtime,
- release_on_commit => TRUE);
- if lstatus = 1 then
- status := 1;
- return;
- end if;
- if lstatus = 4 then
- raise_application_error(-20000,
- 'ORU-10037: attempting to wait on uncommitted signal from same session');
- end if;
- if lstatus <> 0 then
- raise_application_error(-20000,
- 'ORU-10023: error ' || to_char(lstatus) || ' on lock request.');
- end if;
- update dbms_alert_info set changed = 'N'
- where name = upper(waitone.name)
- and sid = this_session_id
- and changed = 'Y';
- if sql%rowcount != 0 then
- select message into message from dbms_alert_info
- where name = upper(waitone.name)
- and sid = this_session_id;
- commit;
-
- dbms_pipe.purge(sigpipe); -- discard unneeded msgs
- status := 0;
- return;
- end if;
- lstatus := dbms_lock.release(lockid);
-
- -- wait for timeout, or until a message arrives on the pipe. If
- -- parallel mode then don't wait longer than p_int.
- pipe_wait(timeout, cumtime);
- if cumtime >= timeout then
- status := 1;
- return;
- end if;
- end loop;
- end;
-
- procedure signal_pipe(pipename varchar2) is
- msgid varchar2(40);
- tmpmsgid varchar2(40);
- status integer;
- begin
- msgid := this_session_id || ':' || to_char(msgseq);
- msgseq := msgseq + 1;
- dbms_pipe.pack_message(msgid);
- status := dbms_pipe.send_message(pipename);
- if status <> 0 then
- raise_application_error(-20000,
- 'ORU-10016: error:' || to_char(status) || ' sending on pipe ' ||
- pipename);
- end if;
-
- -- remove dup signals from the pipe
- status := dbms_pipe.receive_message(pipename, 0);
- if status = 1 then
- -- receiver has already taken signal off of pipe (or pipe is busy)
- return;
- end if;
- if status <> 0 then
- raise_application_error(-20000,
- 'ORU-10017: error:' || to_char(status) || ' receiving on pipe ' ||
- pipename);
- end if;
- dbms_pipe.unpack_message(tmpmsgid);
- if tmpmsgid = msgid then
- -- it was our message so put it back on
- dbms_pipe.pack_message(msgid);
- status := dbms_pipe.send_message(pipename);
- if status <> 0 then
- raise_application_error(-20000,
- 'ORU-10018: error:' || to_char(status) || ' sending on pipe ' ||
- pipename);
- end if;
- end if;
- end;
-
- procedure signal(name in varchar2, message in varchar2) is
- status integer;
- cursor c2(alertname varchar2) is
- select sid from dbms_alert_info
- where name = upper(alertname);
- begin
- status := dbms_lock.request(alert_hash(upper(name)), dbms_lock.s_mode,
- dbms_lock.maxwait, release_on_commit => TRUE);
- -- status 4 means we already own this lock which happens if this alert
- -- is signalled more than once during this transaction, or if we are
- -- signalling multiple different alerts during this transaction and
- -- there is a hash collision on the alert name.
- if status != 0 and status != 4 then
- raise_application_error(-20000,
- 'ORU-10001: lock request error, status: ' || to_char(status));
- end if;
-
- -- We cannot add the clause "and changed = 'N'" since we need to guarantee
- -- that at commit time changed is 'Y'. If we add the clause then a
- -- waiter can reset it to 'N' prior to our commit.
- update dbms_alert_info set changed = 'Y', message = signal.message
- where name = upper(signal.name);
-
- -- signal all interested sessions that "something has happened".
- -- The sessions need to wakeup and then check dbms_alert_info
- -- to find out what, if anything, did happen since this transaction
- -- could rollback after the message is sent.
- for rec in c2(name) loop
- -- make sure requesting session is still alive. cleanup if not.
- status := dbms_lock.request(session_hash(rec.sid), dbms_lock.sx_mode,
- timeout => 0, release_on_commit => TRUE);
- if status = 0 then
- -- ooops, we should not have been able to aquire this lock.
- -- The session must be dead. cleanup
-
- -- don't delete from dbms_alert_info here cause could cause
- -- deadlocks. Just wait for next register call to get
- -- dbms_alert_info cleaned up.
- dbms_pipe.purge('ORA$ALERT$' || rec.sid);
- status := dbms_lock.release(session_hash(rec.sid));
- else
- -- 1 is timeout, 4 is we already own this lock. 4 can happen if
- -- this session is registered for this alert.
- if status != 1 and status != 4 then
- raise_application_error(-20000,
- 'ORU-10022: lock request error, status: ' || to_char(status));
- end if;
-
- -- signal on pipe even if parallel mode since the waiter might
- -- be on this instance.
- signal_pipe('ORA$ALERT$' || rec.sid);
- end if;
-
- end loop;
- end;
-
- procedure removeall is
- begin
- delete from dbms_alert_info where sid = this_session_id;
- dbms_pipe.purge(sigpipe);
- commit;
- end;
-
- end;
- /
- drop public synonym dbms_alert
- /
- create public synonym dbms_alert for dbms_alert
- /
-